Skip to content
This repository was archived by the owner on Oct 16, 2020. It is now read-only.

Commit 2af2a85

Browse files
authored
More Observables (#291)
1 parent e7a79a5 commit 2af2a85

File tree

9 files changed

+255
-240
lines changed

9 files changed

+255
-240
lines changed

src/fs.ts

Lines changed: 68 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,29 @@
1+
import { Observable } from '@reactivex/rxjs';
2+
import { Glob } from 'glob';
13
import * as fs from 'mz/fs';
2-
import { LanguageClient } from './lang-handler';
3-
import glob = require('glob');
4-
import iterate from 'iterare';
54
import { Span } from 'opentracing';
65
import Semaphore from 'semaphore-async-await';
6+
import { LanguageClient } from './lang-handler';
77
import { InMemoryFileSystem } from './memfs';
8-
import { tracePromise } from './tracing';
8+
import { traceObservable } from './tracing';
99
import { normalizeUri, uri2path } from './util';
1010

1111
export interface FileSystem {
1212
/**
1313
* Returns all files in the workspace under base
1414
*
1515
* @param base A URI under which to search, resolved relative to the rootUri
16-
* @return A promise that is fulfilled with an array of URIs
16+
* @return An Observable that emits URIs
1717
*/
18-
getWorkspaceFiles(base?: string, childOf?: Span): Promise<Iterable<string>>;
18+
getWorkspaceFiles(base?: string, childOf?: Span): Observable<string>;
1919

2020
/**
2121
* Returns the content of a text document
2222
*
2323
* @param uri The URI of the text document, resolved relative to the rootUri
24-
* @return A promise that is fulfilled with the text document content
24+
* @return An Observable that emits the text document content
2525
*/
26-
getTextDocumentContent(uri: string, childOf?: Span): Promise<string>;
26+
getTextDocumentContent(uri: string, childOf?: Span): Observable<string>;
2727
}
2828

2929
export class RemoteFileSystem implements FileSystem {
@@ -34,17 +34,18 @@ export class RemoteFileSystem implements FileSystem {
3434
* The files request is sent from the server to the client to request a list of all files in the workspace or inside the directory of the base parameter, if given.
3535
* A language server can use the result to index files by filtering and doing a content request for each text document of interest.
3636
*/
37-
async getWorkspaceFiles(base?: string, childOf = new Span()): Promise<Iterable<string>> {
38-
return iterate(await this.client.workspaceXfiles({ base }, childOf))
37+
getWorkspaceFiles(base?: string, childOf = new Span()): Observable<string> {
38+
return this.client.workspaceXfiles({ base }, childOf)
39+
.mergeMap(textDocuments => textDocuments)
3940
.map(textDocument => normalizeUri(textDocument.uri));
4041
}
4142

4243
/**
4344
* The content request is sent from the server to the client to request the current content of any text document. This allows language servers to operate without accessing the file system directly.
4445
*/
45-
async getTextDocumentContent(uri: string, childOf = new Span()): Promise<string> {
46-
const textDocument = await this.client.textDocumentXcontent({ textDocument: { uri } }, childOf);
47-
return textDocument.text;
46+
getTextDocumentContent(uri: string, childOf = new Span()): Observable<string> {
47+
return this.client.textDocumentXcontent({ textDocument: { uri } }, childOf)
48+
.map(textDocument => textDocument.text);
4849
}
4950
}
5051

@@ -62,24 +63,36 @@ export class LocalFileSystem implements FileSystem {
6263
return uri2path(uri);
6364
}
6465

65-
async getWorkspaceFiles(base = this.rootUri): Promise<Iterable<string>> {
66+
getWorkspaceFiles(base = this.rootUri): Observable<string> {
6667
if (!base.endsWith('/')) {
6768
base += '/';
6869
}
6970
const cwd = this.resolveUriToPath(base);
70-
const files = await new Promise<string[]>((resolve, reject) => {
71-
glob('*', {
71+
return new Observable<string>(subscriber => {
72+
const globber = new Glob('*', {
7273
cwd,
7374
nodir: true,
7475
matchBase: true,
7576
follow: true
76-
}, (err, matches) => err ? reject(err) : resolve(matches));
77+
});
78+
globber.on('match', (file: string) => {
79+
subscriber.next(normalizeUri(base + file));
80+
});
81+
globber.on('error', (err: any) => {
82+
subscriber.error(err);
83+
});
84+
globber.on('end', () => {
85+
subscriber.complete();
86+
});
87+
return () => {
88+
globber.abort();
89+
};
7790
});
78-
return iterate(files).map(file => normalizeUri(base + file));
7991
}
8092

81-
async getTextDocumentContent(uri: string): Promise<string> {
82-
return fs.readFile(this.resolveUriToPath(uri), 'utf8');
93+
getTextDocumentContent(uri: string): Observable<string> {
94+
const filePath = this.resolveUriToPath(uri);
95+
return Observable.fromPromise(fs.readFile(filePath, 'utf8'));
8396
}
8497
}
8598

@@ -91,14 +104,14 @@ export class LocalFileSystem implements FileSystem {
91104
export class FileSystemUpdater {
92105

93106
/**
94-
* Promise for a pending or fulfilled structure fetch
107+
* Observable for a pending or completed structure fetch
95108
*/
96-
private structureFetch?: Promise<void>;
109+
private structureFetch?: Observable<never>;
97110

98111
/**
99-
* Map from URI to Promise of pending or fulfilled content fetch
112+
* Map from URI to Observable of pending or completed content fetch
100113
*/
101-
private fetches = new Map<string, Promise<void>>();
114+
private fetches = new Map<string, Observable<never>>();
102115

103116
/**
104117
* Limits concurrent fetches to not fetch thousands of files in parallel
@@ -112,20 +125,23 @@ export class FileSystemUpdater {
112125
*
113126
* @param uri URI of the file to fetch
114127
* @param childOf A parent span for tracing
128+
* @return Observable that completes when the fetch is finished
115129
*/
116-
async fetch(uri: string, childOf = new Span()): Promise<void> {
130+
fetch(uri: string, childOf = new Span()): Observable<never> {
117131
// Limit concurrent fetches
118-
const promise = this.concurrencyLimit.execute(async () => {
119-
try {
120-
const content = await this.remoteFs.getTextDocumentContent(uri);
132+
const observable = Observable.fromPromise(this.concurrencyLimit.wait())
133+
.mergeMap(() => this.remoteFs.getTextDocumentContent(uri))
134+
.do(content => {
135+
this.concurrencyLimit.signal();
121136
this.inMemoryFs.add(uri, content);
122-
} catch (err) {
137+
}, err => {
123138
this.fetches.delete(uri);
124-
throw err;
125-
}
126-
});
127-
this.fetches.set(uri, promise);
128-
return promise;
139+
})
140+
.ignoreElements()
141+
.publishReplay()
142+
.refCount();
143+
this.fetches.set(uri, observable);
144+
return observable;
129145
}
130146

131147
/**
@@ -134,9 +150,10 @@ export class FileSystemUpdater {
134150
*
135151
* @param uri URI of the file to ensure
136152
* @param childOf An OpenTracing span for tracing
153+
* @return Observable that completes when the file was fetched
137154
*/
138-
ensure(uri: string, childOf = new Span()): Promise<void> {
139-
return tracePromise('Ensure content', childOf, span => {
155+
ensure(uri: string, childOf = new Span()): Observable<never> {
156+
return traceObservable('Ensure content', childOf, span => {
140157
span.addTags({ uri });
141158
return this.fetches.get(uri) || this.fetch(uri, span);
142159
});
@@ -147,20 +164,20 @@ export class FileSystemUpdater {
147164
*
148165
* @param childOf A parent span for tracing
149166
*/
150-
fetchStructure(childOf = new Span()): Promise<void> {
151-
const promise = tracePromise('Fetch workspace structure', childOf, async span => {
152-
try {
153-
const uris = await this.remoteFs.getWorkspaceFiles(undefined, span);
154-
for (const uri of uris) {
167+
fetchStructure(childOf = new Span()): Observable<never> {
168+
const observable = traceObservable('Fetch workspace structure', childOf, span =>
169+
this.remoteFs.getWorkspaceFiles(undefined, span)
170+
.do(uri => {
155171
this.inMemoryFs.add(uri);
156-
}
157-
} catch (err) {
158-
this.structureFetch = undefined;
159-
throw err;
160-
}
161-
});
162-
this.structureFetch = promise;
163-
return promise;
172+
}, err => {
173+
this.structureFetch = undefined;
174+
})
175+
.ignoreElements()
176+
.publishReplay()
177+
.refCount()
178+
);
179+
this.structureFetch = observable;
180+
return observable;
164181
}
165182

166183
/**
@@ -169,8 +186,8 @@ export class FileSystemUpdater {
169186
*
170187
* @param span An OpenTracing span for tracing
171188
*/
172-
ensureStructure(childOf = new Span()) {
173-
return tracePromise('Ensure structure', childOf, span => {
189+
ensureStructure(childOf = new Span()): Observable<never> {
190+
return traceObservable('Ensure structure', childOf, span => {
174191
return this.structureFetch || this.fetchStructure(span);
175192
});
176193
}

src/lang-handler.ts

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ export interface LanguageClient {
2626
* any text document. This allows language servers to operate without accessing the file system
2727
* directly.
2828
*/
29-
textDocumentXcontent(params: TextDocumentContentParams, childOf?: Span): Promise<TextDocumentItem>;
29+
textDocumentXcontent(params: TextDocumentContentParams, childOf?: Span): Observable<TextDocumentItem>;
3030

3131
/**
3232
* The files request is sent from the server to the client to request a list of all files in the
3333
* workspace or inside the directory of the `base` parameter, if given.
3434
*/
35-
workspaceXfiles(params: WorkspaceFilesParams, childOf?: Span): Promise<TextDocumentIdentifier[]>;
35+
workspaceXfiles(params: WorkspaceFilesParams, childOf?: Span): Observable<TextDocumentIdentifier[]>;
3636

3737
/**
3838
* The log message notification is sent from the server to the client to ask
@@ -44,7 +44,7 @@ export interface LanguageClient {
4444
* The cache get request is sent from the server to the client to request the value of a cache
4545
* item identified by the provided key.
4646
*/
47-
xcacheGet(params: CacheGetParams, childOf?: Span): Promise<any>;
47+
xcacheGet(params: CacheGetParams, childOf?: Span): Observable<any>;
4848

4949
/**
5050
* The cache set notification is sent from the server to the client to set the value of a cache
@@ -65,7 +65,7 @@ export interface LanguageClient {
6565
* Can occur as as a result of rename or executeCommand (code action).
6666
* @param params The edits to apply to the workspace
6767
*/
68-
workspaceApplyEdit(params: ApplyWorkspaceEditParams, childOf?: Span): Promise<ApplyWorkspaceEditResponse>;
68+
workspaceApplyEdit(params: ApplyWorkspaceEditParams, childOf?: Span): Observable<ApplyWorkspaceEditResponse>;
6969
}
7070

7171
/**
@@ -145,16 +145,16 @@ export class RemoteLanguageClient {
145145
* any text document. This allows language servers to operate without accessing the file system
146146
* directly.
147147
*/
148-
textDocumentXcontent(params: TextDocumentContentParams, childOf = new Span()): Promise<TextDocumentItem> {
149-
return this.request('textDocument/xcontent', params, childOf).toPromise();
148+
textDocumentXcontent(params: TextDocumentContentParams, childOf = new Span()): Observable<TextDocumentItem> {
149+
return this.request('textDocument/xcontent', params, childOf);
150150
}
151151

152152
/**
153153
* The files request is sent from the server to the client to request a list of all files in the
154154
* workspace or inside the directory of the `base` parameter, if given.
155155
*/
156-
workspaceXfiles(params: WorkspaceFilesParams, childOf = new Span()): Promise<TextDocumentIdentifier[]> {
157-
return this.request('workspace/xfiles', params, childOf).toPromise();
156+
workspaceXfiles(params: WorkspaceFilesParams, childOf = new Span()): Observable<TextDocumentIdentifier[]> {
157+
return this.request('workspace/xfiles', params, childOf);
158158
}
159159

160160
/**
@@ -169,8 +169,8 @@ export class RemoteLanguageClient {
169169
* The cache get request is sent from the server to the client to request the value of a cache
170170
* item identified by the provided key.
171171
*/
172-
xcacheGet(params: CacheGetParams, childOf = new Span()): Promise<any> {
173-
return this.request('xcache/get', params, childOf).toPromise();
172+
xcacheGet(params: CacheGetParams, childOf = new Span()): Observable<any> {
173+
return this.request('xcache/get', params, childOf);
174174
}
175175

176176
/**
@@ -179,7 +179,7 @@ export class RemoteLanguageClient {
179179
* because the server is not supposed to act differently if the cache set failed.
180180
*/
181181
xcacheSet(params: CacheSetParams): void {
182-
return this.notify('xcache/set', params);
182+
this.notify('xcache/set', params);
183183
}
184184

185185
/**
@@ -197,7 +197,7 @@ export class RemoteLanguageClient {
197197
*
198198
* @param params The edits to apply.
199199
*/
200-
workspaceApplyEdit(params: ApplyWorkspaceEditParams, childOf = new Span()): Promise<ApplyWorkspaceEditResponse> {
201-
return this.request('workspace/applyEdit', params, childOf).toPromise();
200+
workspaceApplyEdit(params: ApplyWorkspaceEditParams, childOf = new Span()): Observable<ApplyWorkspaceEditResponse> {
201+
return this.request('workspace/applyEdit', params, childOf);
202202
}
203203
}

src/packages.ts

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { Disposable } from './disposable';
88
import { FileSystemUpdater } from './fs';
99
import { Logger, NoopLogger } from './logging';
1010
import { InMemoryFileSystem } from './memfs';
11-
import { tracePromise } from './tracing';
11+
import { traceObservable } from './tracing';
1212

1313
/**
1414
* Schema of a package.json file
@@ -145,37 +145,44 @@ export class PackageManager extends EventEmitter implements Disposable {
145145

146146
/**
147147
* Gets the content of the closest package.json known to to the DependencyManager in the ancestors of a URI
148+
*
149+
* @return Observable that emits a single PackageJson or never
148150
*/
149-
async getClosestPackageJson(uri: string, span = new Span()): Promise<PackageJson | undefined> {
150-
await this.updater.ensureStructure();
151-
const packageJsonUri = this.getClosestPackageJsonUri(uri);
152-
if (!packageJsonUri) {
153-
return undefined;
154-
}
155-
return await this.getPackageJson(packageJsonUri, span);
151+
getClosestPackageJson(uri: string, span = new Span()): Observable<PackageJson> {
152+
return this.updater.ensureStructure()
153+
.concat(Observable.defer(() => {
154+
const packageJsonUri = this.getClosestPackageJsonUri(uri);
155+
if (!packageJsonUri) {
156+
return Observable.empty<never>();
157+
}
158+
return this.getPackageJson(packageJsonUri, span);
159+
}));
156160
}
157161

158162
/**
159163
* Returns the parsed package.json of the passed URI
160164
*
161165
* @param uri URI of the package.json
166+
* @return Observable that emits a single PackageJson or never
162167
*/
163-
async getPackageJson(uri: string, childOf = new Span()): Promise<PackageJson> {
164-
return tracePromise('Get package.json', childOf, async span => {
168+
getPackageJson(uri: string, childOf = new Span()): Observable<PackageJson> {
169+
return traceObservable('Get package.json', childOf, span => {
165170
span.addTags({ uri });
166171
if (uri.includes('/node_modules/')) {
167-
throw new Error(`Not an own package.json: ${uri}`);
172+
return Observable.throw(new Error(`Not an own package.json: ${uri}`));
168173
}
169174
let packageJson = this.packages.get(uri);
170175
if (packageJson) {
171-
return packageJson;
176+
return Observable.of(packageJson);
172177
}
173-
await this.updater.ensure(uri, span);
174-
packageJson = this.packages.get(uri)!;
175-
if (!packageJson) {
176-
throw new Error(`Expected ${uri} to be registered in PackageManager`);
177-
}
178-
return packageJson;
178+
return this.updater.ensure(uri, span)
179+
.concat(Observable.defer(() => {
180+
packageJson = this.packages.get(uri)!;
181+
if (!packageJson) {
182+
return Observable.throw(new Error(`Expected ${uri} to be registered in PackageManager`));
183+
}
184+
return Observable.of(packageJson);
185+
}));
179186
});
180187
}
181188

0 commit comments

Comments
 (0)