-
Notifications
You must be signed in to change notification settings - Fork 91
/
Copy pathkernel_tests.py
357 lines (312 loc) · 13.4 KB
/
kernel_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
"""Manually crafted tests testing specific features of the kernel.
"""
import unittest
import jupyter_kernel_test
import time
import os
from jupyter_client.manager import start_new_kernel
class SwiftKernelTests(jupyter_kernel_test.KernelTests):
language_name = 'swift'
kernel_name = 'swift'
code_hello_world = 'print("hello, world!")'
code_execute_result = [
{'code': 'let x = 2; x', 'result': 'Use `print()` to show values.\n'},
]
code_generate_error = 'varThatIsntDefined'
def setUp(self):
self.flush_channels()
def test_graphics_matplotlib(self):
reply, output_msgs = self.execute_helper(code="""
%include "EnableIPythonDisplay.swift"
""")
self.assertEqual(reply['content']['status'], 'ok')
reply, output_msgs = self.execute_helper(code="""
let np = Python.import("numpy")
let plt = Python.import("matplotlib.pyplot")
IPythonDisplay.shell.enable_matplotlib("inline")
""")
self.assertEqual(reply['content']['status'], 'ok')
reply, output_msgs = self.execute_helper(code="""
let ys = np.arange(0, 10, 0.01)
plt.plot(ys)
plt.show()
""")
self.assertEqual(reply['content']['status'], 'ok')
self.assertIn('image/png', output_msgs[0]['content']['data'])
def test_extensions(self):
reply, output_msgs = self.execute_helper(code="""
struct Foo{}
""")
self.assertEqual(reply['content']['status'], 'ok')
reply, output_msgs = self.execute_helper(code="""
extension Foo { func f() -> Int { return 1 } }
""")
self.assertEqual(reply['content']['status'], 'ok')
reply, output_msgs = self.execute_helper(code="""
print("Value of Foo().f() is", Foo().f())
""")
self.assertEqual(reply['content']['status'], 'ok')
self.assertIn("Value of Foo().f() is 1", output_msgs[0]['content']['text'])
reply, output_msgs = self.execute_helper(code="""
extension Foo { func f() -> Int { return 2 } }
""")
self.assertEqual(reply['content']['status'], 'ok')
reply, output_msgs = self.execute_helper(code="""
print("Value of Foo().f() is", Foo().f())
""")
self.assertEqual(reply['content']['status'], 'ok')
self.assertIn("Value of Foo().f() is 2", output_msgs[0]['content']['text'])
def test_gradient_across_cells_error(self):
reply, output_msgs = self.execute_helper(code="""
import _Differentiation
func square(_ x : Float) -> Float { return x * x }
""")
self.assertEqual(reply['content']['status'], 'ok')
reply, output_msgs = self.execute_helper(code="""
print("5^2 is", square(5))
""")
self.assertEqual(reply['content']['status'], 'ok')
self.assertIn("5^2 is 25.0", output_msgs[0]['content']['text'])
reply, output_msgs = self.execute_helper(code="""
print("gradient of square at 5 is", gradient(at: 5, in: square))
""")
self.assertEqual(reply['content']['status'], 'error')
self.assertIn("cannot differentiate functions that have not been marked '@differentiable'",
reply['content']['traceback'][0])
def test_gradient_across_cells(self):
reply, output_msgs = self.execute_helper(code="""
import _Differentiation
@differentiable
func square(_ x : Float) -> Float { return x * x }
""")
self.assertEqual(reply['content']['status'], 'ok')
reply, output_msgs = self.execute_helper(code="""
print("5^2 is", square(5))
""")
self.assertEqual(reply['content']['status'], 'ok')
self.assertIn("5^2 is 25.0", output_msgs[0]['content']['text'])
reply, output_msgs = self.execute_helper(code="""
print("gradient of square at 5 is", gradient(at: 5, in: square))
""")
self.assertEqual(reply['content']['status'], 'ok')
self.assertIn("gradient of square at 5 is 10.0", output_msgs[0]['content']['text'])
def test_error_runtime(self):
reply, output_msgs = self.execute_helper(code="""
func a() { fatalError("oops") }
""")
self.assertEqual(reply['content']['status'], 'ok')
a_cell = reply['content']['execution_count']
reply, output_msgs = self.execute_helper(code="""
print("hello")
print("world")
func b() { a() }
""")
self.assertEqual(reply['content']['status'], 'ok')
b_cell = reply['content']['execution_count']
reply, output_msgs = self.execute_helper(code="""
b()
""")
self.assertEqual(reply['content']['status'], 'error')
call_cell = reply['content']['execution_count']
stdout = output_msgs[0]['content']['text']
self.assertIn('Fatal error: oops', stdout)
traceback = output_msgs[1]['content']['traceback']
all_tracebacks = '\n'.join(traceback)
self.assertIn('Current stack trace:', all_tracebacks)
self.assertIn('a()', all_tracebacks)
# TODO(TF-495): Reenable these assertions.
# self.assertIn('b() at <Cell %d>:4:24' % b_cell, all_tracebacks)
# self.assertIn('main at <Cell %d>:2:13' % call_cell, all_tracebacks)
def test_interrupt_execution(self):
# Execute something to trigger debugger initialization, so that the
# next cell executes quickly.
self.execute_helper(code='1 + 1')
msg_id = self.kc.execute(code="""while true {}""")
# Give the kernel some time to actually start execution, because it
# ignores interrupts that arrive when it's not actually executing.
time.sleep(1)
msg = self.kc.iopub_channel.get_msg(timeout=1)
self.assertEqual(msg['content']['execution_state'], 'busy')
self.km.interrupt_kernel()
reply = self.kc.get_shell_msg(timeout=1)
self.assertEqual(reply['content']['status'], 'error')
while True:
msg = self.kc.iopub_channel.get_msg(timeout=1)
if msg['msg_type'] == 'status':
self.assertEqual(msg['content']['execution_state'], 'idle')
break
# Check that the kernel can still execute things after handling an
# interrupt.
reply, output_msgs = self.execute_helper(
code="""print("Hello world")""")
self.assertEqual(reply['content']['status'], 'ok')
for msg in output_msgs:
if msg['msg_type'] == 'stream' and \
msg['content']['name'] == 'stdout':
self.assertIn('Hello world', msg['content']['text'])
break
def test_async_stdout(self):
# Execute something to trigger debugger initialization, so that the
# next cell executes quickly.
self.execute_helper(code='1 + 1')
# Test that we receive stdout while execution is happening by printing
# something and then entering an infinite loop.
msg_id = self.kc.execute(code="""
print("some stdout")
while true {}
""")
# Give the kernel some time to send out the stdout.
time.sleep(1)
# Check that the kernel has sent out the stdout.
while True:
msg = self.kc.iopub_channel.get_msg(timeout=1)
if msg['msg_type'] == 'stream' and \
msg['content']['name'] == 'stdout':
self.assertIn('some stdout', msg['content']['text'])
break
# Interrupt execution and consume all messages, so that subsequent
# tests can run. (All the tests in this class run against the same
# instance of the kernel.)
self.km.interrupt_kernel()
self.kc.get_shell_msg(timeout=1)
while True:
msg = self.kc.iopub_channel.get_msg(timeout=1)
if msg['msg_type'] == 'status':
break
@unittest.skipIf(
os.environ.get('TENSORFLOW_USE_STANDARD_TOOLCHAIN') == 'YES',
'Completion not suported on standard toolchains')
def test_swift_completion(self):
reply, output_msgs = self.execute_helper(code="""
func aFunctionToComplete() {}
""")
self.assertEqual(reply['content']['status'], 'ok')
self.kc.complete('aFunctionToC')
reply = self.kc.get_shell_msg()
self.assertEqual(reply['content']['matches'],
['aFunctionToComplete()'])
self.flush_channels()
reply, output_msgs = self.execute_helper(code="""
%disableCompletion
""")
self.assertEqual(reply['content']['status'], 'ok')
self.kc.complete('aFunctionToC')
reply = self.kc.get_shell_msg()
self.assertEqual(reply['content']['matches'], [])
self.flush_channels()
reply, output_msgs = self.execute_helper(code="""
%enableCompletion
""")
self.assertEqual(reply['content']['status'], 'ok')
self.kc.complete('aFunctionToC')
reply = self.kc.get_shell_msg()
self.assertEqual(reply['content']['matches'],
['aFunctionToComplete()'])
self.flush_channels()
def test_swift_clear_output(self):
reply, output_msgs = self.execute_helper(code=r"""
print("before the clear")
print("\u{001B}[2J")
print("after the clear")
""")
self.assertEqual(reply['content']['status'], 'ok')
self.assertEqual(
dict((k, output_msgs[0][k]) for k in ['msg_type', 'content']),
{
'msg_type': 'stream',
'content': {
'name': 'stdout',
'text': 'before the clear\r\n',
},
})
self.assertEqual(
dict((k, output_msgs[1][k]) for k in ['msg_type', 'content']),
{
'msg_type': 'clear_output',
'content': {
'wait': False
}
})
self.assertEqual(
dict((k, output_msgs[2][k]) for k in ['msg_type', 'content']),
{
'msg_type': 'stream',
'content': {
'name': 'stdout',
'text': '\r\nafter the clear\r\n',
},
})
def test_show_tensor(self):
reply, output_msgs = self.execute_helper(code="""
import TensorFlow
Tensor([1, 2, 3])
""")
self.assertEqual(reply['content']['status'], 'ok')
if 'data' in output_msgs[0]['content']:
self.assertIn(
"Use `print()` to show values",
output_msgs[0]['content']['data']['text/plain'])
# Class for tests that need their own kernel. (`SwiftKernelTestsBase` uses one
# kernel for all the tests.)
class OwnKernelTests(unittest.TestCase):
def test_process_killed(self):
km, kc = start_new_kernel(kernel_name='swift')
kc.execute("""
import Glibc
exit(0)
""")
messages = self.wait_for_idle(kc)
had_error = False
for message in messages:
if message['header']['msg_type'] == 'error':
had_error = True
self.assertEqual(['Process killed'],
message['content']['traceback'])
self.assertTrue(had_error)
def test_install_after_execute(self):
# The kernel is supposed to refuse to install package after executing
# code.
km, kc = start_new_kernel(kernel_name='swift')
kc.execute('1 + 1')
self.wait_for_idle(kc)
kc.execute("""
%install DummyPackage DummyPackage
""")
messages = self.wait_for_idle(kc)
had_error = False
for message in messages:
if message['header']['msg_type'] == 'error':
had_error = True
self.assertIn('Install Error: Packages can only be installed '
'during the first cell execution.',
message['content']['traceback'][0])
self.assertTrue(had_error)
def test_install_after_execute_blank(self):
# If the user executes blank code, the kernel is supposed to try
# to install packages. In particular, Colab sends a blank execution
# request to the kernel when it starts up, and it's important that this
# doesn't block package installation.
km, kc = start_new_kernel(kernel_name='swift')
kc.execute('\n\n\n')
self.wait_for_idle(kc)
kc.execute("""
%install DummyPackage DummyPackage
""")
messages = self.wait_for_idle(kc)
# DummyPackage doesn't exist, so package installation won't actually
# succeed. So we just assert that the kernel tries to install it.
stdout = ''
for message in messages:
if message['header']['msg_type'] == 'stream' and \
message['content']['name'] == 'stdout':
stdout += message['content']['text']
self.assertIn('Installing packages:', stdout)
def wait_for_idle(self, kc):
messages = []
while True:
message = kc.get_iopub_msg(timeout=30)
messages.append(message)
if message['header']['msg_type'] == 'status' and \
message['content']['execution_state'] == 'idle':
break
return messages