Compare commits

...

9 Commits

Author SHA1 Message Date
Soumadipta Roy
a0c77c3e56 Refactor code for readability and cleanliness (#1195) 2025-03-27 18:43:12 +00:00
Soumadipta Roy
601b48d31c Context events (#1187) 2025-03-21 14:27:04 +00:00
Soumadipta Roy
449ccaccc4 Add span filter criteria and filtering logic. (#1186) 2025-03-21 14:06:09 +00:00
Andrei Pangin
de259147a7 SpanFilter 2025-03-21 12:06:57 +00:00
Andrei Pangin
67c7f3ced4 ThreadLocalData fixes 2025-03-21 11:18:45 +00:00
Andrei Pangin
7934615f21 Renamed Span event to SpanEvent to avoid confusion 2025-03-20 22:04:38 +00:00
Andrei Pangin
35d2089d86 ThreadLocalData fixes 2025-03-20 19:04:50 +00:00
Andrei Pangin
0116c4292b Added copyright headers 2025-03-20 18:50:46 +00:00
Andrei Pangin
b7ee5f60b2 Prototype Span events 2025-03-20 18:47:13 +00:00
19 changed files with 377 additions and 35 deletions

View File

@@ -9,6 +9,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
/**
* Java API for in-process profiling. Serves as a wrapper around
@@ -272,4 +273,8 @@ public class AsyncProfiler implements AsyncProfilerMXBean {
private native String execute0(String command) throws IllegalArgumentException, IllegalStateException, IOException;
private native void filterThread0(Thread thread, boolean enable);
static native ByteBuffer getThreadLocalBuffer();
static native void emitSpan(long startTime, long endTime, String payload);
}

View File

@@ -0,0 +1,38 @@
/*
* Copyright The async-profiler authors
* SPDX-License-Identifier: Apache-2.0
*/
package one.profiler;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
public class Span {
private static final ThreadLocal<ByteBuffer> LAST_SAMPLE = new ThreadLocal<ByteBuffer>() {
@Override
protected ByteBuffer initialValue() {
try {
return AsyncProfiler.getThreadLocalBuffer().order(ByteOrder.nativeOrder());
} catch (UnsatisfiedLinkError e) {
return ByteBuffer.allocate(8).order(ByteOrder.nativeOrder());
}
}
};
private final long startTime;
private final String tag;
public Span(String tag) {
this.startTime = System.nanoTime();
this.tag = tag;
}
public void commit() {
long lastSampleTime = LAST_SAMPLE.get().getLong(0);
if (lastSampleTime - startTime > 0) {
AsyncProfiler.emitSpan(startTime, System.nanoTime(), tag);
}
}
}

View File

@@ -51,5 +51,5 @@ DLLEXPORT asprof_error_t asprof_execute(const char* command, asprof_writer_t out
}
DLLEXPORT asprof_thread_local_data* asprof_get_thread_local_data(void) {
return ThreadLocalData::getThreadLocalData();
return ThreadLocalData::get();
}

View File

@@ -17,6 +17,7 @@ public class Arguments {
public String state;
public Pattern include;
public Pattern exclude;
public String filter;
public double minwidth;
public double grain;
public int skip;

View File

@@ -29,7 +29,13 @@ public abstract class JfrConverter extends Classifier {
this.args = args;
EventCollector collector = createCollector(args);
this.collector = args.nativemem && args.leak ? new MallocLeakAggregator(collector) : collector;
if (args.nativemem && args.leak) {
collector = new MallocLeakAggregator(collector);
}
if (args.filter != null) {
collector = new SpanFilter(jfr, collector, args.filter);
}
this.collector = collector;
}
public void convert() throws IOException {

View File

@@ -39,6 +39,7 @@ public class JfrReader implements Closeable {
private ByteBuffer buf;
private final long fileSize;
private long filePosition;
private long rewindPosition;
private byte state;
public long startNanos = Long.MAX_VALUE;
@@ -75,6 +76,7 @@ public class JfrReader implements Closeable {
private int activeSetting;
private int malloc;
private int free;
private int span;
public JfrReader(String fileName) throws IOException {
this.ch = FileChannel.open(Paths.get(fileName), StandardOpenOption.READ);
@@ -134,6 +136,11 @@ public class JfrReader implements Closeable {
return state == STATE_NEW_CHUNK ? readChunk(buf.position()) : state == STATE_READING;
}
public void rewindChunk() throws IOException {
seek(rewindPosition);
state = STATE_READING;
}
public List<Event> readAllEvents() throws IOException {
return readAllEvents(null);
}
@@ -186,6 +193,8 @@ public class JfrReader implements Closeable {
if (cls == null || cls == ContendedLock.class) return (E) readContendedLock(false);
} else if (type == threadPark) {
if (cls == null || cls == ContendedLock.class) return (E) readContendedLock(true);
} else if (type == span) {
if (cls == null || cls == SpanEvent.class) return (E) readSpan();
} else if (type == activeSetting) {
readActiveSetting();
} else {
@@ -258,6 +267,14 @@ public class JfrReader implements Closeable {
return new ContendedLock(time, tid, stackTraceId, duration, classId);
}
private SpanEvent readSpan() {
long time = getVarlong();
long duration = getVarlong();
int tid = getVarint();
String tag = getString();
return new SpanEvent(time, tid, duration, tag);
}
private void readActiveSetting() {
for (JfrField field : typesByName.get("jdk.ActiveSetting").fields) {
getVarlong();
@@ -310,7 +327,8 @@ public class JfrReader implements Closeable {
readConstantPool(chunkStart + cpOffset);
cacheEventTypes();
seek(chunkStart + CHUNK_HEADER_SIZE);
rewindPosition = chunkStart + CHUNK_HEADER_SIZE;
seek(rewindPosition);
state = STATE_READING;
return true;
}
@@ -557,6 +575,7 @@ public class JfrReader implements Closeable {
activeSetting = getTypeId("jdk.ActiveSetting");
malloc = getTypeId("profiler.Malloc");
free = getTypeId("profiler.Free");
span = getTypeId("profiler.Span");
registerEvent("jdk.CPULoad", CPULoad.class);
registerEvent("jdk.GCHeapSummary", GCHeapSummary.class);

View File

@@ -0,0 +1,38 @@
/*
* Copyright The async-profiler authors
* SPDX-License-Identifier: Apache-2.0
*/
package one.jfr.event;
import java.util.Objects;
public class SpanEvent extends Event {
public final long duration;
public final String tag;
public SpanEvent(long time, int tid, long duration, String tag) {
super(time, tid, 0);
this.duration = duration;
this.tag = tag;
}
@Override
public int hashCode() {
return tag != null ? tag.hashCode() : 0;
}
@Override
public boolean sameGroup(Event o) {
if (o instanceof SpanEvent) {
SpanEvent s = (SpanEvent) o;
return Objects.equals(tag, s.tag);
}
return false;
}
@Override
public long value() {
return duration;
}
}

View File

@@ -0,0 +1,87 @@
/*
* Copyright The async-profiler authors
* SPDX-License-Identifier: Apache-2.0
*/
package one.jfr.event;
import one.jfr.JfrReader;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SpanFilter implements EventCollector {
private final Map<Integer, List<SpanEvent>> spans = new HashMap<>();
private final JfrReader jfr;
private final EventCollector wrapped;
private final SpanFilterCriteria spanFilterCriteria;
public SpanFilter(JfrReader jfr, EventCollector wrapped, String filter) {
this.jfr = jfr;
this.wrapped = wrapped;
this.spanFilterCriteria = new SpanFilterCriteria(filter);
}
private boolean shouldCollect(int tid, long time) {
List<SpanEvent> threadSpans = spans.get(tid);
if (threadSpans == null || threadSpans.isEmpty()) {
return false;
}
int index = Collections.binarySearch(
threadSpans,
null,
(span1, span2) -> {
if (time >= span1.time && time <= span1.time + span1.duration) {
return 0;
}
return Long.compare(span1.time, time);
}
);
return index >= 0;
}
@Override
public void collect(Event e) {
if (shouldCollect(e.tid, e.time)) {
wrapped.collect(e);
}
}
@Override
public void beforeChunk() {
try {
for (SpanEvent event; (event = jfr.readEvent(SpanEvent.class)) != null; ) {
if (spanFilterCriteria.matches(event)) {
spans.computeIfAbsent(event.tid, tid -> new ArrayList<>()).add(event);
}
}
spans.forEach((tid, spanList) -> Collections.sort(spanList));
jfr.rewindChunk();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public void afterChunk() {
spans.clear();
}
@Override
public boolean finish() {
return false;
}
@Override
public void forEach(Visitor visitor) {
wrapped.forEach(visitor);
}
}

View File

@@ -0,0 +1,98 @@
/*
* Copyright The async-profiler authors
* SPDX-License-Identifier: Apache-2.0
*/
package one.jfr.event;
public class SpanFilterCriteria {
private final String tag;
private final String operator;
private final long duration;
public SpanFilterCriteria(String filter) {
validateFilter(filter);
String[] parts = filter.split(",");
String parsedTag = null;
String parsedOperator = null;
long parsedDuration = 0;
for (String part : parts) {
part = part.trim();
if (part.startsWith("tag=")) {
if (parsedTag != null) {
throw new IllegalArgumentException("Tag parameter specified multiple times");
}
parsedTag = parseTag(part);
} else if (part.startsWith("duration")) {
if (parsedOperator != null) {
throw new IllegalArgumentException("Duration parameter specified multiple times");
}
parsedOperator = parseOperator(part);
parsedDuration = parseDuration(part, parsedOperator);
} else {
throw new IllegalArgumentException("Invalid filter part. Expected: tag=value or duration(op)value");
}
}
this.tag = parsedTag;
this.operator = parsedOperator;
this.duration = parsedDuration;
}
private void validateFilter(String filter) {
if (filter == null || filter.trim().isEmpty()) {
throw new IllegalArgumentException("Filter cannot be null or empty");
}
}
private String parseTag(String part) {
String[] tagPart = part.split("=");
if (tagPart.length != 2) {
throw new IllegalArgumentException("Invalid tag format. Expected: tag=value");
}
return tagPart[1].trim();
}
private String parseOperator(String part) {
String durationStr = part.substring("duration".length()).trim();
if (durationStr.startsWith(">=")) return ">=";
if (durationStr.startsWith("<=")) return "<=";
if (durationStr.startsWith(">")) return ">";
if (durationStr.startsWith("<")) return "<";
if (durationStr.startsWith("=")) return "=";
throw new IllegalArgumentException("Invalid operator. Expected: >=, <=, >, <, or =");
}
private Long parseDuration(String part, String operator) {
String durationStr = part.substring("duration".length()).trim();
String value = durationStr.substring(operator.length()).trim();
try {
return Long.parseLong(value);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid duration value. Expected a number!", e);
}
}
public boolean matches(SpanEvent event) {
if (tag != null && !event.tag.equals(tag)) {
return false;
}
if (duration > 0) {
Long durationMs = event.duration;
switch (operator) {
case ">=": return durationMs >= duration;
case "<=": return durationMs <= duration;
case ">": return durationMs > duration;
case "<": return durationMs < duration;
case "=": return durationMs == duration;
default: return false;
}
}
return true;
}
}

View File

@@ -23,9 +23,12 @@ enum EventType {
LOCK_SAMPLE,
PARK_SAMPLE,
PROFILING_WINDOW,
SPAN,
};
class Event {
public:
u64 _start_time;
};
class EventWithClassId : public Event {
@@ -35,29 +38,28 @@ class EventWithClassId : public Event {
class ExecutionEvent : public Event {
public:
u64 _start_time;
ThreadState _thread_state;
ExecutionEvent(u64 start_time) : _start_time(start_time), _thread_state(THREAD_UNKNOWN) {}
ExecutionEvent(u64 start_time) {
_start_time = start_time;
_thread_state = THREAD_UNKNOWN;
}
};
class WallClockEvent : public Event {
public:
u64 _start_time;
ThreadState _thread_state;
u32 _samples;
};
class AllocEvent : public EventWithClassId {
public:
u64 _start_time;
u64 _total_size;
u64 _instance_size;
};
class LockEvent : public EventWithClassId {
public:
u64 _start_time;
u64 _end_time;
uintptr_t _address;
long long _timeout;
@@ -65,22 +67,20 @@ class LockEvent : public EventWithClassId {
class LiveObject : public EventWithClassId {
public:
u64 _start_time;
u64 _alloc_size;
u64 _alloc_time;
};
class ProfilingWindow : public Event {
public:
u64 _start_time;
u64 _end_time;
};
class MallocEvent : public Event {
public:
u64 _start_time;
uintptr_t _address;
u64 _size;
};
class SpanEvent : public Event {
public:
u64 _end_time;
const char* _tag;
};
#endif // _EVENT_H

View File

@@ -1289,7 +1289,7 @@ class Recording {
buf->put8(start, buf->offset() - start);
}
void recordWindow(Buffer* buf, int tid, ProfilingWindow* event) {
void recordWindow(Buffer* buf, int tid, SpanEvent* event) {
int start = buf->skip(1);
buf->put8(T_WINDOW);
buf->putVar64(event->_start_time);
@@ -1298,6 +1298,19 @@ class Recording {
buf->put8(start, buf->offset() - start);
}
void recordSpan(Buffer* buf, int tid, SpanEvent* event) {
// Considering MAX_STRING_LENGTH, total event size will always fit into two bytes
int start = buf->skip(2);
buf->put8(T_SPAN);
buf->putVar64(event->_start_time);
buf->putVar64(event->_end_time - event->_start_time);
buf->putVar32(tid);
buf->putUtf8(event->_tag);
u32 size = buf->offset() - start;
buf->put8(start, size | 0x80);
buf->put8(start + 1, size >> 7);
}
void recordCpuLoad(Buffer* buf, float proc_user, float proc_system, float machine_total) {
int start = buf->skip(1);
buf->put8(T_CPU_LOAD);
@@ -1495,7 +1508,10 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u32 call_trace_id,
if (_rec != NULL) {
// Recording an event, increment the sample counter to allow
// user code to attach metadata.
ThreadLocalData::incrementSampleCounter();
asprof_thread_local_data* tld = ThreadLocalData::getIfPresent();
if (tld != NULL) {
tld->sample_counter = event->_start_time;
}
Buffer* buf = _rec->buffer(lock_index);
switch (event_type) {
@@ -1526,7 +1542,10 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u32 call_trace_id,
_rec->recordThreadPark(buf, tid, call_trace_id, (LockEvent*)event);
break;
case PROFILING_WINDOW:
_rec->recordWindow(buf, tid, (ProfilingWindow*)event);
_rec->recordWindow(buf, tid, (SpanEvent*)event);
break;
case SPAN:
_rec->recordSpan(buf, tid, (SpanEvent*)event);
break;
}
_rec->flushIfNeeded(buf);

View File

@@ -10,6 +10,7 @@
#include "javaApi.h"
#include "os.h"
#include "profiler.h"
#include "threadLocalData.h"
#include "vmStructs.h"
@@ -117,15 +118,37 @@ Java_one_profiler_AsyncProfiler_filterThread0(JNIEnv* env, jobject unused, jthre
}
}
extern "C" DLLEXPORT jobject JNICALL
Java_one_profiler_AsyncProfiler_getThreadLocalBuffer(JNIEnv* env, jclass unused) {
asprof_thread_local_data* tld = ThreadLocalData::get();
return tld == NULL ? NULL : env->NewDirectByteBuffer(tld, sizeof(*tld));
}
extern "C" DLLEXPORT void JNICALL
Java_one_profiler_AsyncProfiler_emitSpan(JNIEnv* env, jclass unused, jlong start_time, jlong end_time, jstring tag) {
SpanEvent event;
event._start_time = start_time;
event._end_time = end_time;
event._tag = tag != NULL ? env->GetStringUTFChars(tag, NULL) : NULL;
// TODO: Replace with a mutex-guarded buffer
Profiler::instance()->recordEventOnly(SPAN, &event);
if (tag != NULL) {
env->ReleaseStringUTFChars(tag, event._tag);
}
}
#define F(name, sig) {(char*)#name, (char*)sig, (void*)Java_one_profiler_AsyncProfiler_##name}
static const JNINativeMethod profiler_natives[] = {
F(start0, "(Ljava/lang/String;JZ)V"),
F(stop0, "()V"),
F(execute0, "(Ljava/lang/String;)Ljava/lang/String;"),
F(getSamples, "()J"),
F(filterThread0, "(Ljava/lang/Thread;Z)V"),
F(start0, "(Ljava/lang/String;JZ)V"),
F(stop0, "()V"),
F(execute0, "(Ljava/lang/String;)Ljava/lang/String;"),
F(getSamples, "()J"),
F(filterThread0, "(Ljava/lang/Thread;Z)V"),
F(getThreadLocalBuffer, "()Ljava/nio/ByteBuffer;"),
F(emitSpan, "(JJLjava/lang/String;)V"),
};
static const JNINativeMethod* execute0 = &profiler_natives[2];

View File

@@ -244,6 +244,13 @@ JfrMetadata::JfrMetadata() : Element("root") {
<< field("stackTrace", T_STACK_TRACE, "Stack Trace", F_CPOOL)
<< field("address", T_LONG, "Address", F_ADDRESS))
<< (type("profiler.Span", T_SPAN, "Span")
<< category("Profiler")
<< field("startTime", T_LONG, "Start Time", F_TIME_TICKS)
<< field("duration", T_LONG, "Duration", F_DURATION_TICKS)
<< field("eventThread", T_THREAD, "Event Thread", F_CPOOL)
<< field("tag", T_STRING, "Tag"))
<< (type("jdk.jfr.Label", T_LABEL, NULL)
<< field("value", T_STRING))

View File

@@ -62,6 +62,7 @@ enum JfrType {
T_WALL_CLOCK_SAMPLE = 118,
T_MALLOC = 119,
T_FREE = 120,
T_SPAN = 121,
T_ANNOTATION = 200,
T_LABEL = 201,

View File

@@ -8,6 +8,7 @@
#include <arpa/inet.h>
#include <byteswap.h>
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
#include <sched.h>
#include <stdio.h>

View File

@@ -57,7 +57,7 @@ static CTimer ctimer;
static ITimer itimer;
static Instrument instrument;
static ProfilingWindow profiling_window;
static SpanEvent profiling_window;
// The same constants are used in JfrSync

View File

@@ -22,6 +22,7 @@
#include "dwarf.h"
#include "fdtransferClient.h"
#include "log.h"
#include "os.h"
#ifdef __x86_64__

View File

@@ -11,22 +11,19 @@
class ThreadLocalData {
public:
// Increment the thread-local sample counter. See the `asprof_thread_local_data` docs.
//
// This function *is* async-signal safe, and therefore will not initialize the thread-local
// storage by itself, but will rather do nothing if it's not initialized already. This works
// fine with the `sample_counter` API since only changes in `sample_counter` matter.
static void incrementSampleCounter(void) {
if (_profiler_data_key == -1) return;
asprof_thread_local_data* data = (asprof_thread_local_data*) pthread_getspecific(_profiler_data_key);
if (data != NULL) {
data->sample_counter++;
static asprof_thread_local_data* getIfPresent() {
if (_profiler_data_key == -1) {
return NULL;
}
return (asprof_thread_local_data*) pthread_getspecific(_profiler_data_key);
}
// Get the `asprof_thread_local_data`. See the `asprof_get_thread_local_data` docs.
static asprof_thread_local_data* getThreadLocalData(void) {
static asprof_thread_local_data* get() {
if (_profiler_data_key == -1) {
return NULL;
}

View File

@@ -15,7 +15,8 @@ u64 TSC::_offset = 0;
u64 TSC::_frequency = NANOTIME_FREQ;
void TSC::enable(Clock clock) {
if (!TSC_SUPPORTED || clock == CLK_MONOTONIC) {
// TODO: support TSC clock with context events
if (!TSC_SUPPORTED || clock == CLK_MONOTONIC || true) {
_enabled = false;
return;
}