'''
Provides messages from container logs (STDOUT and STDERR).
'''
import os
import signal
import docker
from multiprocessing import Process, Queue
[docs]def monitor_logs(chute_name, queue, tail=200):
"""
Iterate over log messages from a container and add them to the queue
for consumption. This function will block and wait for new messages
from the container. Use the queue to interface with async code.
tail: number of lines to retrieve from log history; the string "all"
is also valid, but highly discouraged for performance reasons.
"""
client = docker.DockerClient(base_url="unix://var/run/docker.sock", version='auto')
container = client.containers.get(chute_name)
output = container.logs(stdout=True, stderr=True,
stream=True, timestamps=True, follow=True,
tail=tail)
for line in output:
# I have grown to distrust Docker streaming functions. It may
# return a string; it may return an object. If it is a string,
# separate the timestamp portion from the rest of the message.
if isinstance(line, basestring):
parts = line.split(" ", 1)
if len(parts) > 1:
queue.put({
'timestamp': parts[0],
'message': parts[1].rstrip()
})
else:
queue.put({
'message': line.rstrip()
})
else:
queue.put(line)
[docs]class LogProvider(object):
def __init__(self, chutename):
self.chutename = chutename
self.queue = Queue()
self.listening = False
[docs] def attach(self):
"""
Start listening for log messages.
Log messages in the queue will appear like the following:
{
'timestamp': '2017-01-30T15:46:23.009397536Z',
'message': 'Something happened'
}
"""
if not self.listening:
self.process = Process(target=monitor_logs, args=(self.chutename, self.queue))
self.process.start()
self.listening = True
[docs] def get_logs(self):
logs = []
while not self.queue.empty():
msg = self.queue.get()
logs.append(msg)
return logs
[docs] def detach(self):
"""
Stop listening for log messages.
After this is called, no additional messages will be added to the
queue.
"""
if self.listening:
# We have to kill the process explicitly with SIGKILL,
# terminate() function does not work here.
os.kill(self.process.pid, signal.SIGKILL)
# self.process.terminate()
# self.process.join()
self.listening = False