Commit | Line | Data |
---|---|---|
8e15b929 MK |
1 | /********************************************************************** |
2 | * Copyright (c) 2014 Ericsson | |
3 | * | |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Matthew Khouzam - Initial implementation | |
11 | **********************************************************************/ | |
12 | ||
13 | package org.eclipse.linuxtools.internal.lttng2.control.core.relayd; | |
14 | ||
15 | import java.io.File; | |
16 | import java.io.FileOutputStream; | |
17 | import java.io.IOException; | |
18 | import java.net.Socket; | |
19 | import java.util.List; | |
20 | import java.util.Map; | |
21 | import java.util.TreeMap; | |
22 | ||
23 | import org.eclipse.core.runtime.IProgressMonitor; | |
24 | import org.eclipse.core.runtime.IStatus; | |
25 | import org.eclipse.core.runtime.jobs.Job; | |
26 | import org.eclipse.linuxtools.ctf.core.trace.CTFReaderException; | |
27 | import org.eclipse.linuxtools.ctf.core.trace.Metadata; | |
d84419e1 AM |
28 | import org.eclipse.linuxtools.ctf.core.trace.CTFStream; |
29 | import org.eclipse.linuxtools.ctf.core.trace.CTFStreamInput; | |
8e15b929 MK |
30 | import org.eclipse.linuxtools.internal.lttng2.control.core.Activator; |
31 | import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.AttachSessionResponse; | |
32 | import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.LttngViewerCommands; | |
33 | import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.SessionResponse; | |
34 | import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.StreamResponse; | |
35 | import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.TracePacketResponse; | |
36 | import org.eclipse.linuxtools.tmf.ctf.core.CtfTmfTrace; | |
37 | ||
38 | /** | |
39 | * Consumer of the relay d | |
40 | * | |
41 | * @author Matthew Khouzam | |
42 | * @since 3.0 | |
43 | */ | |
44 | public class LttngRelaydConsumer { | |
45 | ||
46 | private final Job fConsumerJob; | |
47 | private final String fAddress; | |
48 | private final int fPort; | |
49 | private final int fSession; | |
50 | private final CtfTmfTrace fCtfTrace; | |
51 | /** | |
52 | * Map with the Lttng streams | |
53 | * | |
54 | * key: stream id value: stream file | |
55 | */ | |
56 | private final Map<Long, File> fStreams = new TreeMap<>(); | |
57 | ||
58 | /** | |
59 | * Start a lttng consumer | |
60 | * | |
61 | * @param address | |
62 | * the ip address in string format | |
63 | * @param port | |
64 | * the port, an integer | |
65 | * @param session | |
66 | * the session id | |
67 | * @param ctfTrace | |
68 | * the parent trace | |
69 | */ | |
70 | public LttngRelaydConsumer(String address, final int port, final int session, final CtfTmfTrace ctfTrace) { | |
71 | fAddress = address; | |
72 | fPort = port; | |
73 | fSession = session; | |
74 | fCtfTrace = ctfTrace; | |
d84419e1 AM |
75 | for (CTFStream s : fCtfTrace.getCTFTrace().getStreams()) { |
76 | for (CTFStreamInput si : s.getStreamInputs()) { | |
8e15b929 MK |
77 | fStreams.put(si.getStream().getId(), new File(si.getStream().getTrace().getPath() + si.getFilename())); |
78 | } | |
79 | } | |
80 | ||
81 | fConsumerJob = new Job("RelayD consumer") { //$NON-NLS-1$ | |
82 | ||
83 | @Override | |
84 | protected IStatus run(IProgressMonitor monitor) { | |
85 | try (Socket connection = new Socket(fAddress, fPort); | |
86 | ILttngRelaydConnector relayd = LttngRelaydConnectorFactory.getNewConnector(connection);) { | |
87 | ||
88 | List<SessionResponse> sessions = relayd.getSessions(); | |
89 | AttachSessionResponse attachedSession = relayd.attachToSession(sessions.get(fSession)); | |
90 | ||
91 | while (!monitor.isCanceled()) { | |
92 | ||
93 | List<StreamResponse> attachedStreams = attachedSession.getStreamList(); | |
94 | for (StreamResponse stream : attachedStreams) { | |
95 | ||
96 | TracePacketResponse packet = relayd.getNextPacket(stream); | |
97 | // more streams | |
98 | if ((packet.getFlags() & LttngViewerCommands.NEW_STREAM) == LttngViewerCommands.NEW_STREAM) { | |
99 | Iterable<StreamResponse> newStreams = relayd.getNewStreams(); | |
100 | for (StreamResponse streamToAdd : newStreams) { | |
101 | ||
102 | File f = new File(fCtfTrace.getPath() + File.separator + streamToAdd.getPathName() + streamToAdd.getChannelName()); | |
103 | // touch the file | |
104 | f.setLastModified(System.currentTimeMillis()); | |
105 | fStreams.put(Long.valueOf(streamToAdd.getId()), f); | |
106 | fCtfTrace.getCTFTrace().addStream(streamToAdd.getId(), f); | |
107 | ||
108 | } | |
109 | ||
110 | } | |
111 | // more metadata | |
112 | if ((packet.getFlags() & LttngViewerCommands.NEW_METADATA) == LttngViewerCommands.NEW_METADATA) { | |
113 | ||
114 | String metaData = relayd.getMetadata(attachedSession); | |
115 | (new Metadata(ctfTrace.getCTFTrace())).parseTextFragment(metaData); | |
116 | } | |
117 | ||
118 | try (FileOutputStream fos = new FileOutputStream(fStreams.get(stream.getId()), true)) { | |
119 | fos.write(packet.getData()); | |
120 | monitor.worked(1); | |
121 | } | |
122 | } | |
123 | ||
124 | } | |
125 | ||
126 | } catch (IOException | CTFReaderException e) { | |
127 | Activator.getDefault().logError("Error during live trace reading", e); //$NON-NLS-1$ | |
128 | } | |
129 | return null; | |
130 | } | |
131 | }; | |
132 | fConsumerJob.schedule(); | |
133 | } | |
134 | ||
135 | } |