ctf: Rename Stream* classes to CTFStream*
[deliverable/tracecompass.git] / org.eclipse.linuxtools.lttng2.control.core / src / org / eclipse / linuxtools / internal / lttng2 / control / core / relayd / LttngRelaydConsumer.java
CommitLineData
8e15b929
MK
1/**********************************************************************
2 * Copyright (c) 2014 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Matthew Khouzam - Initial implementation
11 **********************************************************************/
12
13package org.eclipse.linuxtools.internal.lttng2.control.core.relayd;
14
15import java.io.File;
16import java.io.FileOutputStream;
17import java.io.IOException;
18import java.net.Socket;
19import java.util.List;
20import java.util.Map;
21import java.util.TreeMap;
22
23import org.eclipse.core.runtime.IProgressMonitor;
24import org.eclipse.core.runtime.IStatus;
25import org.eclipse.core.runtime.jobs.Job;
26import org.eclipse.linuxtools.ctf.core.trace.CTFReaderException;
27import org.eclipse.linuxtools.ctf.core.trace.Metadata;
d84419e1
AM
28import org.eclipse.linuxtools.ctf.core.trace.CTFStream;
29import org.eclipse.linuxtools.ctf.core.trace.CTFStreamInput;
8e15b929
MK
30import org.eclipse.linuxtools.internal.lttng2.control.core.Activator;
31import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.AttachSessionResponse;
32import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.LttngViewerCommands;
33import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.SessionResponse;
34import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.StreamResponse;
35import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.TracePacketResponse;
36import org.eclipse.linuxtools.tmf.ctf.core.CtfTmfTrace;
37
38/**
39 * Consumer of the relay d
40 *
41 * @author Matthew Khouzam
42 * @since 3.0
43 */
44public class LttngRelaydConsumer {
45
46 private final Job fConsumerJob;
47 private final String fAddress;
48 private final int fPort;
49 private final int fSession;
50 private final CtfTmfTrace fCtfTrace;
51 /**
52 * Map with the Lttng streams
53 *
54 * key: stream id value: stream file
55 */
56 private final Map<Long, File> fStreams = new TreeMap<>();
57
58 /**
59 * Start a lttng consumer
60 *
61 * @param address
62 * the ip address in string format
63 * @param port
64 * the port, an integer
65 * @param session
66 * the session id
67 * @param ctfTrace
68 * the parent trace
69 */
70 public LttngRelaydConsumer(String address, final int port, final int session, final CtfTmfTrace ctfTrace) {
71 fAddress = address;
72 fPort = port;
73 fSession = session;
74 fCtfTrace = ctfTrace;
d84419e1
AM
75 for (CTFStream s : fCtfTrace.getCTFTrace().getStreams()) {
76 for (CTFStreamInput si : s.getStreamInputs()) {
8e15b929
MK
77 fStreams.put(si.getStream().getId(), new File(si.getStream().getTrace().getPath() + si.getFilename()));
78 }
79 }
80
81 fConsumerJob = new Job("RelayD consumer") { //$NON-NLS-1$
82
83 @Override
84 protected IStatus run(IProgressMonitor monitor) {
85 try (Socket connection = new Socket(fAddress, fPort);
86 ILttngRelaydConnector relayd = LttngRelaydConnectorFactory.getNewConnector(connection);) {
87
88 List<SessionResponse> sessions = relayd.getSessions();
89 AttachSessionResponse attachedSession = relayd.attachToSession(sessions.get(fSession));
90
91 while (!monitor.isCanceled()) {
92
93 List<StreamResponse> attachedStreams = attachedSession.getStreamList();
94 for (StreamResponse stream : attachedStreams) {
95
96 TracePacketResponse packet = relayd.getNextPacket(stream);
97 // more streams
98 if ((packet.getFlags() & LttngViewerCommands.NEW_STREAM) == LttngViewerCommands.NEW_STREAM) {
99 Iterable<StreamResponse> newStreams = relayd.getNewStreams();
100 for (StreamResponse streamToAdd : newStreams) {
101
102 File f = new File(fCtfTrace.getPath() + File.separator + streamToAdd.getPathName() + streamToAdd.getChannelName());
103 // touch the file
104 f.setLastModified(System.currentTimeMillis());
105 fStreams.put(Long.valueOf(streamToAdd.getId()), f);
106 fCtfTrace.getCTFTrace().addStream(streamToAdd.getId(), f);
107
108 }
109
110 }
111 // more metadata
112 if ((packet.getFlags() & LttngViewerCommands.NEW_METADATA) == LttngViewerCommands.NEW_METADATA) {
113
114 String metaData = relayd.getMetadata(attachedSession);
115 (new Metadata(ctfTrace.getCTFTrace())).parseTextFragment(metaData);
116 }
117
118 try (FileOutputStream fos = new FileOutputStream(fStreams.get(stream.getId()), true)) {
119 fos.write(packet.getData());
120 monitor.worked(1);
121 }
122 }
123
124 }
125
126 } catch (IOException | CTFReaderException e) {
127 Activator.getDefault().logError("Error during live trace reading", e); //$NON-NLS-1$
128 }
129 return null;
130 }
131 };
132 fConsumerJob.schedule();
133 }
134
135}
This page took 0.029299 seconds and 5 git commands to generate.